package net.gdface.facelog;

import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import javax.management.InstanceNotFoundException;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.broker.jmx.BrokerViewMBean;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import gu.simplemq.Constant;
import gu.simplemq.IMessageQueueFactory;
import gu.simplemq.MQProperties;
import gu.simplemq.MessageQueueFactorys;
import gu.simplemq.MessageQueueType;
import net.gdface.utils.BaseVolatile;
import net.gdface.utils.ILazyInitVariable;
import net.gdface.utils.NetworkUtil;
import static com.google.common.base.Preconditions.*;
import static gu.dtalk.activemq.ActivemqContext.HELPER;

/**
 * 本地 ACTIVEMQ 服务控制器<br>
 * @author guyadong
 *
 */
class ActivemqController extends BaseMQController implements Constant {
	public static final String PROTOCOL_OPENWIRE = "openwire";
	public static final String PROTOCOL_MQTT = "mqtt";
	public static final String PROTOCOL_AMQP = "amqp";
	public static final String PROTOCOL_STOMP = "stomp";
	public static final String DEFAULT_AMQ_SCHEMA = "tcp";
	public static final String DEFAULT_WS_CONNECTOR = "ws://localhost:61614";
	protected static final int DEFAULT_JMXPORT=1099;
	protected static final int JMXPORT = CONFIG.getInt(ACTIVEMQ_JMXPORT,DEFAULT_JMXPORT);
	/** 本地服务是否为嵌入服务标志 */
	private final boolean embedded;
	private static volatile Map<String, String> transportConnectors;
	/** 连接用户名,没定义则返回空 */
	private final ILazyInitVariable<String> username = new BaseVolatile<String>() {
		@Override
		protected String doGet() {
			String username = CONFIG.getString(MQ_CONNECT_USERNAME,"");
			return username;
		}
	};
	/** 连接密码,没定义则返回空 */
	private final ILazyInitVariable<String> password = new BaseVolatile<String>() {
		@Override
		protected String doGet() {
			String password = CONFIG.getString(MQ_CONNECT_PASSWORD,"");
			return password;
		}
	};
	private final LoadingCache<String, URI> localTransportURIs = CacheBuilder.newBuilder()
			.build(new CacheLoader<String, URI>(){

		@Override
		public URI load(String key) throws Exception {
			return getTransportURIByProtocol(key);
		}});
	protected final Map<String, Object> originConnParameters;
	protected final MQProperties initializedProperties;
	/**
	 * 用于独立启动服务的构造方法
	 * @param factory
	 */
	protected ActivemqController(IMessageQueueFactory factory) {
		super(factory);
		this.embedded = false;
		if(factory != null){
			checkArgument(MessageQueueType.ACTIVEMQ.equals(factory.getImplType()),
					"%s message type required  for %s,but type of %s is %s",MessageQueueType.ACTIVEMQ,
					factory.getClass().getSimpleName(),factory.getImplType());	
			originConnParameters = factory.getMQConnParameters();		
			initializedProperties = HELPER.initParameters(originConnParameters);
			location =  initializedProperties.getLocation();
		}else{
			originConnParameters = Collections.emptyMap();
			initializedProperties = HELPER.asMQProperties(null);
			location = URI.create("tcp://localhost");
		}
		waitIfAbsent = CONFIG.getBoolean(ACTIVEMQ_WAITIFABSENT,false);
		tryCountLimit = CONFIG.getInt(ACTIVEMQ_TRYCOUNT,DEFAULT_TRY_COUNT);
		tryInterval = CONFIG.getLong(ACTIVEMQ_TRYINTERVAL,DEFAULT_TRY_INTERVAL);
	}
	/**
	 * 用于嵌入式服务的构造方法
	 * @param connParameters
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	protected ActivemqController(Map connParameters) {
		super(null);
		this.embedded = true;
		originConnParameters = (Map)checkNotNull(connParameters,"connParameters is null");
		initializedProperties = HELPER.initParameters(connParameters);
		location =  initializedProperties.getLocation();
		waitIfAbsent = CONFIG.getBoolean(ACTIVEMQ_WAITIFABSENT,false);
		tryCountLimit = CONFIG.getInt(ACTIVEMQ_TRYCOUNT,DEFAULT_TRY_COUNT);
		tryInterval = CONFIG.getLong(ACTIVEMQ_TRYINTERVAL,DEFAULT_TRY_INTERVAL);
	}
	@Override
	protected boolean canStartLocal(){
		return isEmbedded() || null != CONFIG.getString(ACTIVEMQ_EXE);
	}
	@Override
	protected boolean testConnect(){
		if(factory != null && factory.initialized()){
			return super.testConnect();
		}
		try {
			getLocalTransportConnectors();
			return true;
		} catch (IOException | InstanceNotFoundException e) {
			return false;
		}
	}
	@Override
	protected boolean isEmbedded() {
		return embedded;
	}

	/**
	 * 使用JMX返回本地服务的连接参数
	 * @throws IOException 无法连接服务
	 * @throws InstanceNotFoundException 无法连接服务
	 */
	private static Map<String, String> getLocalTransportConnectors() throws IOException, InstanceNotFoundException {
		// double check
		if(transportConnectors == null){
			synchronized (ActivemqController.class) {
				if(transportConnectors == null){					
					String name = "org.apache.activemq:type=Broker,brokerName=localhost";
					try { 
						ObjectName objectName = new ObjectName(name);
						//			if (mbs.isRegistered(objectName)) {
						//				String port = System.getProperty("com.sun.management.jmxremote.port");
						JMXServiceURL url =
								new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxrmi",JMXPORT));
						JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
						MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
						BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(
										mbsc,objectName,BrokerViewMBean.class,true);
						transportConnectors = mbean.getTransportConnectors();
						//		}
					} catch (UndeclaredThrowableException e) {
						if(null != e.getCause()){
							Throwables.throwIfInstanceOf(e.getCause(), InstanceNotFoundException.class);
						}
						throw e;
					}catch (Exception e) {
						Throwables.throwIfInstanceOf(e, IOException.class);
						Throwables.throwIfUnchecked(e);
						throw new RuntimeException(e);
					}
				}
			}
		}
		return transportConnectors;
	}
	private static URI getTransportURIByProtocol(String protocol) throws IOException, InstanceNotFoundException{
		try {
			URI uri = new URI(checkNotNull(getLocalTransportConnectors().get(protocol),"not define transport that named openwire"));
			return  new URI(uri.getScheme(),null,"localhost",uri.getPort(),null,null,null);
		} catch (Exception e) {
			Throwables.throwIfInstanceOf(e, IOException.class);
			Throwables.throwIfInstanceOf(e, InstanceNotFoundException.class);			
			Throwables.throwIfUnchecked(e);
			throw new RuntimeException(e);
		}
	}
	private static boolean setTransportIfNoNull(Properties props,String key,String protocol) throws InstanceNotFoundException, IOException{
		URI uri = getTransportURIByProtocol(protocol);
		if(uri != null){
			props.setProperty(key, uri.toString());
			return true;			
		}
		return false;
	}
	public URI getLocalTransportURI(String protocol){
		return localTransportURIs.getUnchecked(protocol);
	}
	public URI getLocalURI(){
		return getLocalTransportURI(PROTOCOL_AMQP);
	}
	protected String getUsername(){
		return username.get();
	}
	protected String getPassword(){
		return password.get();
	}
	@SuppressWarnings({ "unchecked", "rawtypes" })
	static ActivemqController makeActivemqController(){
		IMessageQueueFactory factory = MessageQueueFactorys.getFactory(MessageQueueType.ACTIVEMQ);
		Map<MQParam, String> messageQueueConfig = GlobalConfig.makeMessageQueueMqConfig();
		String connJson = messageQueueConfig.get(MQParam.MQ_CONNECT);
		Map<String, String> connParam = MessageQueueFactorys.asMQConnParam2(connJson);
		MQProperties properties = HELPER.initParameters(connParam);		
		URI location = HELPER.getLocation(properties);
		if(NetworkUtil.selfBind(location.getHost())){
			try {
				// 本地服务已经启动则将配置要求的通信协议对应的参数更新到properties
				String protocol = location.getScheme();
				if(protocol.equals("tcp")){
					protocol = PROTOCOL_OPENWIRE;
				}
				checkState(setTransportIfNoNull(properties, MQ_URI, protocol),
						"ACTIVEMQ SERVICE not avaiable,caused by not found valid transport {}",
						protocol);
				if(setTransportIfNoNull(properties, MQ_PUBSUB_URI, PROTOCOL_MQTT)){
					properties.setProperty(MQ_PUBSUB_MQTT, Boolean.TRUE.toString());
				}

			} catch (IOException | InstanceNotFoundException e) {
				// 本地服务未启动
				return new ActivemqControllerEmbedded(connParam);
			}
		}
		factory.init((Map)properties);
		return new ActivemqControllerLocal(factory);
	}
}
